home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import urllib
- import httplib
- import platform
- import threading
- import tempfile
- import traceback
- import os
- import sys
- from xml.etree.ElementTree import XML
- from import Device
- __all__ = [
- 'OpenPrinting']
-
- def _normalize_space(text):
- result = text.strip()
- result = result.replace('\n', ' ')
- i = result.find(' ')
- while i != -1:
- result = result.replace(' ', ' ')
- i = result.find(' ')
- return result
-
-
- class _QueryThread(threading.Thread):
-
- def __init__(self, parent, parameters, callback, user_data = None):
- threading.Thread.__init__(self)
- self.parent = parent
- self.parameters = parameters
- self.callback = callback
- self.user_data = user_data
- self.setDaemon(True)
-
-
- def run(self):
- query_command = '/query.cgi'
- headers = {
- 'Content-type': 'application/x-www-form-urlencoded',
- 'Accept': 'text/plain' }
- params = '%s&uilanguage=%s&locale=%s' % (urllib.urlencode(self.parameters), self.parent.language[0], self.parent.language[0])
- self.url = 'http://%s%s?%s' % (self.parent.base_url, query_command, params)
- result = None
- status = 1
-
- try:
- conn = httplib.HTTPConnection(self.parent.base_url)
- conn.request('POST', query_command, params, headers)
- resp = conn.getresponse()
- status = resp.status
- if status == 200:
- result = resp.read()
-
- conn.close()
- except:
- result = sys.exc_info()
-
- if status == 200:
- status = 0
-
- if self.callback != None:
- self.callback(status, self.user_data, result)
-
-
-
-
- class OpenPrinting:
-
- def __init__(self, language = None):
- '''
- @param language: language, as given by the first element of
- locale.setlocale().
- @type language: string
- '''
- if language == None:
- import locale
-
- try:
- language = locale.getlocale(locale.LC_MESSAGES)
- except locale.Error:
- e = None
- language = 'C'
- except:
- None<EXCEPTION MATCH>locale.Error
-
-
- None<EXCEPTION MATCH>locale.Error
- self.language = language
- self.base_url = 'www.openprinting.org'
- self.onlyfree = 1
- self.onlymanufacturer = 0
-
-
- def cancelOperation(self, handle):
- '''
- Cancel an operation.
-
- @param handle: query/operation handle
- '''
-
- try:
- handle.callback = None
- except:
- pass
-
-
-
- def webQuery(self, parameters, callback, user_data = None):
- '''
- Run a web query for a driver.
-
- @type parameters: dict
- @param parameters: URL parameters
- @type callback: function
- @param callback: callback function, taking (integer, user_data, string)
- parameters with the first parameter being the status code, zero for
- success
- @return: query handle
- '''
- the_thread = _QueryThread(self, parameters, callback, user_data)
- the_thread.start()
- return the_thread
-
-
- def searchPrinters(self, searchterm, callback, user_data = None):
- '''
- Search for printers using a search term.
-
- @type searchterm: string
- @param searchterm: search term
- @type callback: function
- @param callback: callback function, taking (integer, user_data, string)
- parameters with the first parameter being the status code, zero for
- success
- @return: query handle
- '''
-
- def parse_result(status, data, result):
- (callback, user_data) = data
- if status != 0:
- callback(status, user_data, result)
- return None
- status = 0
- printers = { }
-
- try:
- root = XML(result)
- for printer in root.findall('printer'):
- id = printer.find('id')
- make = printer.find('make')
- model = printer.find('model')
- if id != None and make != None and model != None:
- idtxt = id.text
- maketxt = make.text
- modeltxt = model.text
- if idtxt and maketxt and modeltxt:
- printers[idtxt] = maketxt + ' ' + modeltxt
-
- modeltxt
- except:
- status != 0
- status = 1
- printers = sys.exc_info()
-
-
- try:
- callback(status, user_data, printers)
- except:
- status != 0
- (type, value, tb) = sys.exc_info()
- tblast = traceback.extract_tb(tb, limit = None)
- if len(tblast):
- tblast = tblast[:len(tblast) - 1]
-
- extxt = traceback.format_exception_only(type, value)
- for line in traceback.format_tb(tb):
- print line.strip()
-
- print extxt[0].strip()
-
-
- params = {
- 'type': 'printers',
- 'printer': searchterm,
- 'format': 'xml' }
- return self.webQuery(params, parse_result, (callback, user_data))
-
-
- def listDrivers(self, model, callback, user_data = None, extra_options = None):
- '''
- Obtain a list of printer drivers.
-
- @type model: string or cupshelpers.Device
- @param model: foomatic printer model string or a cupshelpers.Device
- object
- @type callback: function
- @param callback: callback function, taking (integer, user_data, string)
- parameters with the first parameter being the status code, zero for
- success
- @type extra_options: string -> string dictionary
- @param extra_options: Additional search options, see
- http://www.linuxfoundation.org/en/OpenPrinting/Database/Query
- @return: query handle
- '''
-
- def parse_result(status, data, result):
- (callback, user_data) = data
- if status != 0:
- callback(status, user_data, result)
-
-
- try:
- root = XML(result)
- drivers = { }
- for driver in root.findall('driver'):
- id = driver.attrib.get('id')
- if id == None:
- continue
-
- dict = { }
- for attribute in [
- 'name',
- 'url',
- 'supplier',
- 'license',
- 'shortdescription']:
- element = driver.find(attribute)
- if element != None and element.text != None:
- dict[attribute] = _normalize_space(element.text)
- continue
-
- element = driver.find('licensetext')
- if element != None:
- dict['licensetext'] = element.text
-
- for boolean in [
- 'nonfreesoftware',
- 'recommended',
- 'patents',
- 'thirdpartysupplied',
- 'manufacturersupplied']:
- dict[boolean] = driver.find(boolean) != None
-
- dict['freesoftware'] = not dict['nonfreesoftware']
- supportcontacts = []
- container = driver.find('supportcontacts')
- if container != None:
- for sc in container.findall('supportcontact'):
- supportcontact = { }
- if sc.text != None:
- supportcontact['name'] = _normalize_space(sc.text)
- else:
- supportcontact['name'] = ''
- supportcontact['url'] = sc.attrib.get('url')
- supportcontact['level'] = sc.attrib.get('level')
- supportcontacts.append(supportcontact)
-
-
- if supportcontacts:
- dict['supportcontacts'] = supportcontacts
-
- if not dict.has_key('name') or not dict.has_key('url'):
- continue
-
- container = driver.find('functionality')
- if container != None:
- functionality = { }
- for attribute in [
- 'text',
- 'lineart',
- 'graphics',
- 'photo',
- 'speed']:
- element = container.find(attribute)
- if element != None:
- functionality[attribute] = element.text
- continue
-
- if functionality:
- dict[container.tag] = functionality
-
-
- packages = { }
- container = driver.find('packages')
- if container != None:
- for arch in container.getchildren():
- rpms = { }
- for package in arch.findall('package'):
- rpm = { }
- for attribute in [
- 'realversion',
- 'version',
- 'release',
- 'url',
- 'pkgsys']:
- element = package.find(attribute)
- if element != None:
- rpm[attribute] = element.text
- continue
-
- repositories = package.find('repositories')
- if repositories != None:
- for pkgsys in repositories.getchildren():
- rpm.setdefault('repositories', { })[pkgsys.tag] = pkgsys.text
-
-
- rpms[package.attrib['file']] = rpm
-
- packages[arch.tag] = rpms
-
-
- if packages:
- dict['packages'] = packages
-
- ppds = []
- container = driver.find('ppds')
- if container != None:
- for each in container.getchildren():
- ppds.append(each.text)
-
-
- if ppds:
- dict['ppds'] = ppds
-
- drivers[id] = dict
-
- callback(0, user_data, drivers)
- except:
- callback(1, user_data, sys.exc_info())
-
-
- if isinstance(model, Device):
- model = model.id
-
- params = {
- 'type': 'drivers',
- 'moreinfo': '1',
- 'showprinterid': '1',
- 'onlynewestdriverpackages': '1',
- 'architectures': platform.machine(),
- 'noobsoletes': '1',
- 'onlyfree': str(self.onlyfree),
- 'onlymanufacturer': str(self.onlymanufacturer),
- 'printer': model,
- 'format': 'xml' }
- if extra_options:
- params.update(extra_options)
-
- return self.webQuery(params, parse_result, (callback, user_data))
-
-
-
- def _simple_gui():
- import gtk
- import pprint
- gtk.gdk.threads_init()
-
- class QueryApp('QueryApp', ()):
-
- def __init__(self):
- self.openprinting = OpenPrinting()
- self.main = gtk.Dialog('OpenPrinting query application', None, gtk.DIALOG_MODAL | gtk.DIALOG_NO_SEPARATOR, (gtk.STOCK_CLOSE, gtk.RESPONSE_CLOSE, 'Search', 10, 'List', 20))
- self.main.set_border_width(6)
- self.main.vbox.set_spacing(2)
- vbox = gtk.VBox(False, 6)
- self.main.vbox.pack_start(vbox, True, True, 0)
- vbox.set_border_width(6)
- self.entry = gtk.Entry()
- vbox.pack_start(self.entry, False, False, 6)
- sw = gtk.ScrolledWindow()
- self.tv = gtk.TextView()
- sw.add(self.tv)
- vbox.pack_start(sw, True, True, 6)
- self.main.connect('response', self.response)
- self.main.show_all()
-
-
- def response(self, dialog, response):
- if response == gtk.RESPONSE_CLOSE or response == gtk.RESPONSE_DELETE_EVENT:
- gtk.main_quit()
-
- if response == 10:
- self.openprinting.searchPrinters(self.entry.get_text(), self.search_printers_callback)
-
- if response == 20:
- self.openprinting.listDrivers(self.entry.get_text(), self.list_drivers_callback)
-
-
-
- def search_printers_callback(self, status, user_data, printers):
- if status != 0:
- raise printers[1]
- status != 0
- text = ''
- for printer in printers.values():
- text += printer + '\n'
-
- gtk.gdk.threads_enter()
- self.tv.get_buffer().set_text(text)
- gtk.gdk.threads_leave()
-
-
- def list_drivers_callback(self, status, user_data, drivers):
- if status != 0:
- raise drivers[1]
- status != 0
- text = pprint.pformat(drivers)
- gtk.gdk.threads_enter()
- self.tv.get_buffer().set_text(text)
- gtk.gdk.threads_leave()
-
-
- def query_callback(self, status, user_data, result):
- gtk.gdk.threads_enter()
- self.tv.get_buffer().set_text(str(result))
- file('result.xml', 'w').write(str(result))
- gtk.gdk.threads_leave()
-
-
- q = QueryApp()
- gtk.main()
-
-